# ----------------------------------------------------------------------
# ' split rx / m / diag files to daily units
# ----------------------------------------------------------------------

load_library = c('bit64','data.table','fst','future.apply','stringr','logger','vroom')
invisible(lapply(load_library, function(x) library(x, character.only=TRUE, quietly= TRUE)))
library(parallel)

# read arguments
args=commandArgs(TRUE)
infile = args[[1]]
outfile = args[[2]]
data_type = args[[3]]
universe = args[[4]]

bucket = '/N/project/iuni_doctorshopping'
target_dir = file.path(bucket,'derived_v4_202101','daily_fst',universe)

if (data_type == 'r'){
	select_col = c('PATID','PAT_PLANID',"FILL_DT","PRESCRIPT_ID",
	"DEA","NPI","PRESCRIBER_PROV","PHARM","SPCLT_IND",
	"NDC","PRC_TYP","AHFSCLSS","FORM_IND","FORM_TYP","MAIL_IND",	
	"FST_FILL","RFL_NBR","QUANTITY","DAYS_SUP","STRENGTH")
	date_column = 'FILL_DT'

} else if (data_type == 'diag'){
	select_col = c('PATID','PAT_PLANID',"FST_DT",
	"CLMID","DIAG","ICD_FLAG","LOC_CD","POA")
	date_column = 'FST_DT'

} else if (data_type == 'm') {
	select_col = c('PATID','PAT_PLANID','FST_DT','OP_VISIT_ID',
	'ADMIT_CHAN','ADMIT_TYPE','ICD_FLAG','LOC_CD','NDC','POS','PROC_CD','PROCMOD',
	'BILL_PROV','SERVICE_PROV','REFER_PROV','PROV','PROVCAT','TOS_CD','TOS_EXT','UNITS','ALT_UNITS',
	'CLMID','CLMSEQ','CONF_ID','DSTATUS')
	date_column = 'FST_DT'
} else {
	stop('now implemented yet for this data type ', data_type)
}

logger::log_info('now reading data files ...')
dt = read_fst(infile, as.data.table=TRUE, columns = select_col)

split_sample_to_day = function(yd, date_column, outfile=NULL){
	
	dt_yd_sample = dt[get(date_column) == yd, ]

	if (is.null(outfile)) {
		return(dt_yd_sample)
	} else {
		write_fst(dt_yd_sample, outfile, 100)
		return(NULL)
	}
}

list_day = sort(unique(dt[[date_column]]))
lits_day_outfile = file.path(target_dir,paste0(universe,'_',data_type,list_day,'.fst'))

logger::log_info('start to split...')

lapply(1:length(list_day), function(i) {		

	if (file.exists(lits_day_outfile[i])){

		logger::log_info('already completed for ',lits_day_outfile[i])

	} else {

		logger::log_info('now running for ',lits_day_outfile[i])
		split_sample_to_day(list_day[i], date_column, lits_day_outfile[i])

	}
})

# once generate files, read again to check the data split 
logger::log_info('read again to check data transfer ...')

metadata = list(); n = 1
for (ff in lits_day_outfile) {
	message('now reading ', ff)
	data = read_fst(ff, columns='PATID')
	n_sample = nrow(data)
	metadata[[n]] = data.frame(file_name = ff, n_observation = n_sample)
	n = n + 1
}

metadata = rbindlist(metadata)
fwrite(metadata,outfile)



